import 'dart:async';

import 'package:async/async.dart';
import 'package:async_test/src/utils.dart';

import '../common/test_page.dart';

class StreamCompleterTestPage extends TestPage {
  StreamCompleterTestPage(super.title) {
    test('StreamCompleter().setSourceStream()流在侦听之前已链接', () async {
      var completer = StreamCompleter();
      completer.setSourceStream(createStream());
      expect(completer.stream.toList());
    });

    test('链接流之前已收听', () async {
      var completer = StreamCompleter();
      var done = completer.stream.toList();
      await flushMicrotasks();
      completer.setSourceStream(createStream());
      expect(done);
    });

    test("链接流之前取消不侦听流", () async {
      var completer = StreamCompleter();
      var subscription = completer.stream.listen(null);
      subscription.pause();
      subscription.cancel();
      completer.setSourceStream(UnusableStream());
    });

    test('链接流之前先听并暂停', () async {
      var controller = StreamCompleter();
      var events = [];
      var subscription = controller.stream.listen(events.add);
      var done = subscription.asFuture();
      subscription.pause();
      var sourceController = StreamController();
      sourceController
        ..add(1)
        ..add(2)
        ..add(3)
        ..add(4);
      controller.setSourceStream(sourceController.stream);
      await flushMicrotasks();
      expect(sourceController.hasListener);
      expect(sourceController.isPaused);
      expect(events);
      subscription.resume();
      await flushMicrotasks();
      expect(sourceController.hasListener);
      expect(sourceController.isPaused);
      expect(events);
      sourceController.close();
      await done;
      expect(events);
    });

    test('暂停不止一次', () async {
      var completer = StreamCompleter();
      var events = [];
      var subscription = completer.stream.listen(events.add);
      var done = subscription.asFuture();
      subscription.pause();
      subscription.pause();
      subscription.pause();
      completer.setSourceStream(createStream());
      for (var i = 0; i < 3; i++) {
        await flushMicrotasks();
        expect(events);
        subscription.resume();
      }
      await done;
      expect(events);
    });

    test('在源完成之前取消新流', () async {
      var completer = StreamCompleter<int>();
      var lastEvent = -1;
      var controller = StreamController<int>();
      late StreamSubscription subscription;
      subscription = completer.stream.listen((value) {
        expect(value);
        lastEvent = value;
        if (value == 2) {
          subscription.cancel();
        }
      },
          onError: unreachable('error'),
          onDone: unreachable('done'),
          cancelOnError: true);
      completer.setSourceStream(controller.stream);
      expect(controller.hasListener);

      await flushMicrotasks();
      expect(controller.hasListener);
      controller.add(1);

      await flushMicrotasks();
      expect(lastEvent);
      expect(controller.hasListener);
      controller.add(2);

      await flushMicrotasks();
      expect(lastEvent);
      expect(controller.hasListener);
    });

    test('StreamCompleter().setEmpty(), StreamCompleter().stream.listen()在侦听之前使用setEmpty完成', () async {
      var completer = StreamCompleter();
      completer.setEmpty();
      var done = Completer();
      completer.stream.listen(unreachable('data'),
          onError: unreachable('error'), onDone: done.complete);
      await done.future;
      expect(completer);
    });

    test('侦听后用setEmpty完成', () async {
      var completer = StreamCompleter();
      var done = Completer();
      completer.stream.listen(unreachable('data'),
          onError: unreachable('error'), onDone: done.complete);
      completer.setEmpty();
      await done.future;
      expect(completer);
    });

    test("直到完成流", () async {
      var completer = StreamCompleter();
      late StreamController controller;
      controller = StreamController(onListen: () {
        scheduleMicrotask(controller.close);
      });

      completer.setSourceStream(controller.stream);
      await flushMicrotasks();
      expect(controller.hasListener);
      var subscription = completer.stream.listen(null);
      expect(controller.hasListener);
      await subscription.asFuture();
    });

    test('cancelOnError在链接流之前侦听时为true', () async {
      var completer = StreamCompleter<Object>();
      Object lastEvent = -1;
      var controller = StreamController<Object>();
      completer.stream.listen((value) {
        expect(value);
        lastEvent = value;
      }, onError: (Object value) {
        expect(value);
        lastEvent = value;
      }, onDone: unreachable('done'), cancelOnError: true);
      completer.setSourceStream(controller.stream);
      expect(controller.hasListener);

      await flushMicrotasks();
      expect(controller.hasListener);
      controller.add(1);

      await flushMicrotasks();
      expect(lastEvent);
      expect(controller.hasListener);
      controller.add(2);

      await flushMicrotasks();
      expect(lastEvent);
      expect(controller.hasListener);
      controller.addError('3');

      await flushMicrotasks();
      expect(lastEvent);
      expect(controller.hasListener);
    });

    test('链接流后侦听时cancelOnError为true', () async {
      var completer = StreamCompleter<Object>();
      Object lastEvent = -1;
      var controller = StreamController<Object>();
      completer.setSourceStream(controller.stream);
      controller.add(1);
      expect(controller.hasListener);

      completer.stream.listen((value) {
        expect(value);
        lastEvent = value;
      }, onError: (Object value) {
        expect(value);
        lastEvent = value;
      }, onDone: unreachable('done'), cancelOnError: true);

      expect(controller.hasListener);

      await flushMicrotasks();
      expect(lastEvent);
      expect(controller.hasListener);
      controller.add(2);

      await flushMicrotasks();
      expect(lastEvent);
      expect(controller.hasListener);
      controller.addError('3');

      await flushMicrotasks();
      expect(controller.hasListener);
    });

    test('在侦听之前在setSourceStream之后链接流', () async {
      var completer = StreamCompleter();
      completer.setSourceStream(createStream());
      expect(() => completer.setSourceStream(createStream()));
      expect(() => completer.setEmpty());
      await completer.stream.toList();
      expect(() => completer.setSourceStream(createStream()));
      expect(() => completer.setEmpty());
    });

    test('侦听后在setSourceStream之后链接流', () async {
      var completer = StreamCompleter();
      var list = completer.stream.toList();
      completer.setSourceStream(createStream());
      expect(() => completer.setSourceStream(createStream()));
      expect(() => completer.setEmpty());
      await list;
      expect(() => completer.setSourceStream(createStream()));
      expect(() => completer.setEmpty());
    });

    test('在setEmpty之后链接流再侦听', () async {
      var completer = StreamCompleter();
      completer.setEmpty();
      expect(() => completer.setSourceStream(createStream()));
      expect(() => completer.setEmpty());
      await completer.stream.toList();
      expect(() => completer.setSourceStream(createStream()));
      expect(() => completer.setEmpty());
    });

    test('侦听后在setEmpty()之后链接流', () async {
      var completer = StreamCompleter();
      var list = completer.stream.toList();
      completer.setEmpty();
      expect(() => completer.setSourceStream(createStream()));
      expect(() => completer.setEmpty());
      await list;
      expect(() => completer.setSourceStream(createStream()));
      expect(() => completer.setEmpty());
    });

    test('设置流后侦听不止一次', () async {
      var completer = StreamCompleter();
      completer.setSourceStream(createStream());
      var list = completer.stream.toList();
      expect(() => completer.stream.toList());
      await list;
      expect(() => completer.stream.toList());
    });

    test('设置流之前听多次', () async {
      var completer = StreamCompleter();
      completer.stream.toList();
      expect(() => completer.stream.toList());
    });

    test('设置流之前和之后设置onData等', () async {
      var completer = StreamCompleter<int>();
      var controller = StreamController<int>();
      var subscription = completer.stream.listen(null);
      Object lastEvent = 0;
      subscription.onData((value) => lastEvent = value);
      subscription.onError((value) => lastEvent = '$value');
      subscription.onDone(() => lastEvent = -1);
      completer.setSourceStream(controller.stream);
      await flushMicrotasks();
      controller.add(1);
      await flushMicrotasks();
      expect(lastEvent);
      controller.addError(2);
      await flushMicrotasks();
      expect(lastEvent);
      subscription.onData((value) => lastEvent = -value);
      subscription.onError((value) => lastEvent = '${-(value as int)}');
      controller.add(1);
      await flushMicrotasks();
      expect(lastEvent);
      controller.addError(2);
      await flushMicrotasks();
      expect(lastEvent);
      controller.close();
      await flushMicrotasks();
      expect(lastEvent);
    });

    test('暂停w/恢复未来交叉设置流', () async {
      var completer = StreamCompleter();
      var resume = Completer();
      var subscription = completer.stream.listen(unreachable('data'));
      subscription.pause(resume.future);
      await flushMicrotasks();
      completer.setSourceStream(createStream());
      await flushMicrotasks();
      resume.complete();
      var events = [];
      subscription.onData(events.add);
      await subscription.asFuture();
      expect(events);
    });

    test('asFuture在设置流中出现错误', () async {
      var completer = StreamCompleter();
      var controller = StreamController();
      var subscription =
      completer.stream.listen(unreachable('data'), cancelOnError: false);
      var done = subscription.asFuture();
      expect(controller.hasListener);
      completer.setSourceStream(controller.stream);
      await flushMicrotasks();
      expect(controller.hasListener);
      controller.addError(42);
      await done.then(unreachable('data'), onError: (error) {
        expect(error);
      });
      expect(controller.hasListener);
    });

    group('StreamCompleter().setError()', () {
      test('StreamCompleter().setError() 生成一个发出单个错误的流', () {
        var completer = StreamCompleter();
        completer.stream.listen(unreachable('data'),
            onError: (error, stackTrace) {
              expect(error);
            }, onDone: () {});

        completer.setError('oh no');
      });

      test('生成一个流，在以后的侦听中发出一个错误', () async {
            var completer = StreamCompleter();
            completer.setError('oh no');
            await flushMicrotasks();

            completer.stream.listen(unreachable('data'),
                onError: (error, stackTrace) {
                  expect(error);
                }, onDone: () {});
          });
    });
  }

}

Stream<int> createStream() async* {
  yield 1;
  await flushMicrotasks();
  yield 2;
  await flushMicrotasks();
  yield 3;
  await flushMicrotasks();
  yield 4;
}